home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.4) '''Storage implementation using a log written to a single file. $Revision: 1.16 $ ''' import base64 from cPickle import Pickler, Unpickler, loads import errno import os import sys import time import logging from types import StringType from struct import pack, unpack fsync = getattr(os, 'fsync', None) from ZODB import BaseStorage, ConflictResolution, POSException from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors, VersionLockError from persistent.TimeStamp import TimeStamp from ZODB.lock_file import LockFile from ZODB.utils import p64, u64, cp, z64 from ZODB.FileStorage.fspack import FileStoragePacker from ZODB.FileStorage.format import FileStorageFormatter, DataHeader, TxnHeader, DATA_HDR, DATA_HDR_LEN, TRANS_HDR, TRANS_HDR_LEN, CorruptedDataError from ZODB.loglevels import BLATHER from ZODB.fsIndex import fsIndex packed_version = 'FS21' logger = logging.getLogger('ZODB.FileStorage') def panic(message, *data): logger.critical(message, *data) raise CorruptedTransactionError(message) class FileStorageError(POSException.StorageError): pass class PackError(FileStorageError): pass class FileStorageFormatError(FileStorageError): '''Invalid file format The format of the given file is not valid. ''' pass class CorruptedFileStorageError(FileStorageError, POSException.StorageSystemError): '''Corrupted file storage.''' pass class CorruptedTransactionError(CorruptedFileStorageError): pass class FileStorageQuotaError(FileStorageError, POSException.StorageSystemError): '''File storage quota exceeded.''' pass class RedundantPackWarning(FileStorageError): pass class TempFormatter(FileStorageFormatter): '''Helper class used to read formatted FileStorage data.''' def __init__(self, afile): self._file = afile class FileStorage(BaseStorage.BaseStorage, ConflictResolution.ConflictResolvingStorage, FileStorageFormatter): _pack_is_in_progress = False _records_before_save = 10000 def __init__(self, file_name, create = False, read_only = False, stop = None, quota = None): if read_only: self._is_read_only = True if create: raise ValueError("can't create a read-only file") elif stop is not None: raise ValueError('time-travel only supported in read-only mode') if stop is None: stop = '\xff' * 8 if not read_only: self._lock_file = LockFile(file_name + '.lock') self._tfile = open(file_name + '.tmp', 'w+b') self._tfmt = TempFormatter(self._tfile) else: self._tfile = None self._file_name = file_name BaseStorage.BaseStorage.__init__(self, file_name) (index, vindex, tindex, tvindex, oid2tid, toid2tid, toid2tid_delete) = self._newIndexes() self._initIndex(index, vindex, tindex, tvindex, oid2tid, toid2tid, toid2tid_delete) self._file = None if not create: try: if not read_only or 'rb': pass self._file = open(file_name, 'r+b') except IOError: exc = None if exc.errno == errno.EFBIG: raise if exc.errno == errno.ENOENT: create = 1 if os.path.exists(file_name): raise else: create = 1 except: os.path.exists(file_name) None<EXCEPTION MATCH>IOError if self._file is None and create: if os.path.exists(file_name): os.remove(file_name) self._file = open(file_name, 'w+b') self._file.write(packed_version) r = self._restore_index() if r is not None: self._used_index = 1 (index, vindex, start, ltid) = r self._initIndex(index, vindex, tindex, tvindex, oid2tid, toid2tid, toid2tid_delete) (self._pos, self._oid, tid) = read_index(self._file, file_name, index, vindex, tindex, stop, ltid = ltid, start = start, read_only = read_only) else: self._used_index = 0 (self._pos, self._oid, tid) = read_index(self._file, file_name, index, vindex, tindex, stop, read_only = read_only) self._save_index() self._records_before_save = max(self._records_before_save, len(self._index)) self._ltid = tid self._ts = tid = TimeStamp(tid) t = time.time() t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,)) if tid > t: seconds = tid.timeTime() - t.timeTime() complainer = logger.warning if seconds > 30 * 60: complainer = logger.critical complainer('%s Database records %d seconds in the future', file_name, seconds) self._quota = quota self._oid2tid_nlookups = self._oid2tid_nhits = 0 def _initIndex(self, index, vindex, tindex, tvindex, oid2tid, toid2tid, toid2tid_delete): self._index = index self._vindex = vindex self._tindex = tindex self._tvindex = tvindex self._index_get = index.get self._vindex_get = vindex.get self._oid2tid = oid2tid self._toid2tid = toid2tid self._toid2tid_delete = toid2tid_delete def __len__(self): return len(self._index) def _newIndexes(self): return (fsIndex(), { }, { }, { }, { }, { }, { }) _saved = 0 def _save_index(self): '''Write the database index to a file to support quick startup.''' if self._is_read_only: return None index_name = self.__name__ + '.index' tmp_name = index_name + '.index_tmp' f = open(tmp_name, 'wb') p = Pickler(f, 1) info = { 'index': self._index, 'pos': self._pos, 'oid': self._oid, 'vindex': self._vindex } p.dump(info) f.flush() f.close() try: try: os.remove(index_name) except OSError: pass os.rename(tmp_name, index_name) except: pass self._saved += 1 def _clear_index(self): index_name = self.__name__ + '.index' if os.path.exists(index_name): try: os.remove(index_name) except OSError: pass except: None<EXCEPTION MATCH>OSError None<EXCEPTION MATCH>OSError def _sane(self, index, pos): '''Sanity check saved index data by reading the last undone trans Basically, we read the last not undone transaction and check to see that the included records are consistent with the index. Any invalid record records or inconsistent object positions cause zero to be returned. ''' r = self._check_sanity(index, pos) if not r: logger.warning('Ignoring index for %s', self._file_name) return r def _check_sanity(self, index, pos): if pos < 100: return 0 self._file.seek(0, 2) if self._file.tell() < pos: return 0 ltid = None max_checked = 5 checked = 0 while checked < max_checked: self._file.seek(pos - 8) rstl = self._file.read(8) tl = u64(rstl) pos = pos - tl - 8 if pos < 4: return 0 h = self._read_txn_header(pos) if not ltid: ltid = h.tid if h.tlen != tl: return 0 if h.status == 'u': continue if h.status not in ' p': return 0 if tl < h.headerlen(): return 0 tend = pos + tl opos = pos + h.headerlen() if opos == tend: continue while opos < tend and checked < max_checked: h = self._read_data_header(opos) if opos + h.recordlen() > tend or h.tloc != pos: return 0 if index.get(h.oid, 0) != opos: return 0 checked += 1 opos = opos + h.recordlen() return ltid def _restore_index(self): '''Load database index to support quick startup.''' file_name = self.__name__ index_name = file_name + '.index' try: f = open(index_name, 'rb') except: return None p = Unpickler(f) try: info = p.load() except: (exc, err) = sys.exc_info()[:2] logger.warning('Failed to load database index: %s: %s', exc, err) return None index = info.get('index') pos = info.get('pos') vindex = info.get('vindex') if index is None and pos is None or vindex is None: return None pos = long(pos) if (isinstance(index, dict) or isinstance(index, fsIndex)) and isinstance(index._data, dict): newindex = fsIndex() newindex.update(index) index = newindex if not self._is_read_only: f = open(index_name, 'wb') p = Pickler(f, 1) info['index'] = index p.dump(info) f.close() return self._restore_index() tid = self._sane(index, pos) if not tid: return None return (index, vindex, pos, tid) def close(self): self._file.close() if hasattr(self, '_lock_file'): self._lock_file.close() if self._tfile: self._tfile.close() try: self._save_index() except: logger.error('Error saving index on close()', exc_info = True) def _get_cached_tid(self, oid): self._oid2tid_nlookups += 1 result = self._oid2tid.get(oid) if self._oid2tid_nlookups & 8191 == 0: logger.log(BLATHER, '_oid2tid size %s lookups %s hits %s rate %.1f%%', len(self._oid2tid), self._oid2tid_nlookups, self._oid2tid_nhits, 100.0 * self._oid2tid_nhits / self._oid2tid_nlookups) return result def abortVersion(self, src, transaction): return self.commitVersion(src, '', transaction, abort = True) def commitVersion(self, src, dest, transaction, abort = False): if self._is_read_only: raise POSException.ReadOnlyError() if not src and isinstance(src, StringType) and isinstance(dest, StringType): raise POSException.VersionCommitError('Invalid source version') if src == dest: raise POSException.VersionCommitError("Can't commit to same version: %s" % repr(src)) if dest and abort: raise POSException.VersionCommitError("Internal error, can't abort to a version") if transaction is not self._transaction: raise POSException.StorageTransactionError(self, transaction) self._lock_acquire() try: return self._commitVersion(src, dest, transaction, abort) finally: self._lock_release() def _commitVersion(self, src, dest, transaction, abort = False): srcpos = self._vindex_get(src, 0) spos = p64(srcpos) middle = pack('>8sH8s', p64(self._pos), len(dest), z64) if dest: sd = p64(self._vindex_get(dest, 0)) heredelta = 66 + len(dest) else: sd = '' heredelta = 50 here = self._pos + self._tfile.tell() + self._thl oids = [] current_oids = { } while srcpos: h = self._read_data_header(srcpos) if self._index.get(h.oid) == srcpos: self._tindex[h.oid] = here oids.append(h.oid) self._tfile.write(h.oid + self._tid + spos + middle) if dest: self._tvindex[dest] = here self._tfile.write(p64(h.pnv) + sd + dest) sd = p64(here) if not abort or p64(h.pnv): pass self._tfile.write(spos) here += heredelta current_oids[h.oid] = 1 elif not current_oids.has_key(h.oid): break srcpos = h.vprev spos = p64(srcpos) self._toid2tid_delete.update(current_oids) return (self._tid, oids) def getSize(self): return self._pos def _lookup_pos(self, oid): try: return self._index[oid] except KeyError: raise POSKeyError(oid) except TypeError: raise TypeError('invalid oid %r' % (oid,)) def loadEx(self, oid, version): self._lock_acquire() try: pos = self._lookup_pos(oid) h = self._read_data_header(pos, oid) if h.version and h.version != version: data = self._loadBack_impl(oid, h.pnv)[0] return (data, h.tid, '') if h.plen: data = self._file.read(h.plen) return (data, h.tid, h.version) else: data = self._loadBack_impl(oid, h.back)[0] return (data, h.tid, h.version) finally: self._lock_release() def load(self, oid, version): '''Return pickle data and serial number.''' self._lock_acquire() try: pos = self._lookup_pos(oid) h = self._read_data_header(pos, oid) if h.version and h.version != version: data = self._loadBack_impl(oid, h.pnv)[0] return (data, h.tid) if h.plen: data = self._file.read(h.plen) return (data, h.tid) else: data = self._loadBack_impl(oid, h.back)[0] return (data, h.tid) finally: self._lock_release() def loadSerial(self, oid, serial): self._lock_acquire() try: pos = self._lookup_pos(oid) while None: h = self._read_data_header(pos, oid) if h.tid == serial: break pos = h.prev if not pos: raise POSKeyError(oid) continue if h.version: return self._loadBack_impl(oid, h.pnv)[0] if h.plen: return self._file.read(h.plen) else: return self._loadBack_impl(oid, h.back)[0] finally: self._lock_release() def loadBefore(self, oid, tid): self._lock_acquire() try: pos = self._lookup_pos(oid) end_tid = None while True: h = self._read_data_header(pos, oid) if h.version: if not h.pnv: return None pos = h.pnv continue if h.tid < tid: break pos = h.prev end_tid = h.tid if not pos: return None continue if h.back: (data, _, _, _) = self._loadBack_impl(oid, h.back) return (data, h.tid, end_tid) else: return (self._file.read(h.plen), h.tid, end_tid) finally: self._lock_release() def modifiedInVersion(self, oid): self._lock_acquire() try: pos = self._lookup_pos(oid) h = self._read_data_header(pos, oid) return h.version finally: self._lock_release() def store(self, oid, serial, data, version, transaction): if self._is_read_only: raise POSException.ReadOnlyError() if transaction is not self._transaction: raise POSException.StorageTransactionError(self, transaction) self._lock_acquire() try: if oid > self._oid: self.set_max_oid(oid) old = self._index_get(oid, 0) cached_tid = None pnv = None if old: cached_tid = self._get_cached_tid(oid) if cached_tid is None: h = self._read_data_header(old, oid) if h.version: if h.version != version: raise VersionLockError(oid, h.version) pnv = h.pnv cached_tid = h.tid if serial != cached_tid: rdata = self.tryToResolveConflict(oid, cached_tid, serial, data) if rdata is None: raise POSException.ConflictError(oid = oid, serials = (cached_tid, serial), data = data) else: data = rdata pos = self._pos here = pos + self._tfile.tell() + self._thl self._tindex[oid] = here new = DataHeader(oid, self._tid, old, pos, len(version), len(data)) if version: if not self._tvindex.get(version, 0): pass pv = self._vindex.get(version, 0) if pnv is None: pnv = old new.setVersion(version, pnv, pv) self._tvindex[version] = here self._toid2tid_delete[oid] = 1 else: self._toid2tid[oid] = self._tid self._tfile.write(new.asString()) self._tfile.write(data) if self._quota is not None and here > self._quota: raise FileStorageQuotaError('The storage quota has been exceeded.') if old and serial != cached_tid: return ConflictResolution.ResolvedSerial else: return self._tid finally: self._lock_release() def _data_find(self, tpos, oid, data): self._file.seek(tpos) h = self._file.read(TRANS_HDR_LEN) (tid, tl, status, ul, dl, el) = unpack(TRANS_HDR, h) self._file.read(ul + dl + el) tend = tpos + tl + 8 pos = self._file.tell() while pos < tend: h = self._read_data_header(pos) if h.oid == oid: if h.plen == 0: return pos if h.plen != len(data): logger.error('Mismatch between data and backpointer at %d', pos) return 0 _data = self._file.read(h.plen) if data != _data: return 0 return pos pos += h.recordlen() self._file.seek(pos) return 0 def restore(self, oid, serial, data, version, prev_txn, transaction): if self._is_read_only: raise POSException.ReadOnlyError() if transaction is not self._transaction: raise POSException.StorageTransactionError(self, transaction) self._lock_acquire() try: if oid > self._oid: self.set_max_oid(oid) prev_pos = 0 if prev_txn is not None: prev_txn_pos = self._txn_find(prev_txn, 0) if prev_txn_pos: prev_pos = self._data_find(prev_txn_pos, oid, data) old = self._index_get(oid, 0) here = self._pos + self._tfile.tell() + self._thl self._tindex[oid] = here if prev_pos: data = None if data is None: dlen = 0 else: dlen = len(data) new = DataHeader(oid, serial, old, self._pos, len(version), dlen) if version: if not self._restore_pnv(oid, old, version, prev_pos): pass pnv = old vprev = self._tvindex.get(version, 0) if not vprev: vprev = self._vindex.get(version, 0) new.setVersion(version, pnv, vprev) self._tvindex[version] = here self._toid2tid_delete[oid] = 1 else: self._toid2tid[oid] = serial self._tfile.write(new.asString()) if data is None: if prev_pos: self._tfile.write(p64(prev_pos)) else: self._tfile.write(z64) else: self._tfile.write(data) finally: self._lock_release() def _restore_pnv(self, oid, prev, version, bp): if not prev: return None h = self._read_data_header(prev, oid) if h.version: return h.pnv if h.back: h2 = self._read_data_header(h.back, oid) if h2.version: return h2.pnv def supportsUndo(self): return 1 def supportsVersions(self): return 1 def _clear_temp(self): self._tindex.clear() self._tvindex.clear() self._toid2tid.clear() self._toid2tid_delete.clear() if self._tfile is not None: self._tfile.seek(0) def _begin(self, tid, u, d, e): self._nextpos = 0 self._thl = TRANS_HDR_LEN + len(u) + len(d) + len(e) if self._thl > 65535: if len(u) > 65535: raise FileStorageError('user name too long') if len(d) > 65535: raise FileStorageError('description too long') if len(e) > 65535: raise FileStorageError('too much extension data') def tpc_vote(self, transaction): self._lock_acquire() try: if transaction is not self._transaction: return None dlen = self._tfile.tell() if not dlen: return None self._tfile.seek(0) (user, descr, ext) = self._ude self._file.seek(self._pos) tl = self._thl + dlen try: h = TxnHeader(self._tid, tl, 'c', len(user), len(descr), len(ext)) h.user = user h.descr = descr h.ext = ext self._file.write(h.asString()) cp(self._tfile, self._file, dlen) self._file.write(p64(tl)) self._file.flush() except: self._file.truncate(self._pos) raise self._nextpos = self._pos + tl + 8 finally: self._lock_release() _records_written = 0 def _finish(self, tid, u, d, e): nextpos = self._nextpos self._ltid = tid def _abort(self): if self._nextpos: self._file.truncate(self._pos) self._nextpos = 0 def supportsTransactionalUndo(self): return 1 def _undoDataInfo(self, oid, pos, tpos): '''Return the tid, data pointer, data, and version for the oid record at pos''' if tpos: pos = tpos - self._pos - self._thl tpos = self._tfile.tell() h = self._tfmt._read_data_header(pos, oid) afile = self._tfile else: h = self._read_data_header(pos, oid) afile = self._file if h.oid != oid: raise UndoError('Invalid undo transaction id', oid) if h.plen: data = afile.read(h.plen) else: data = '' pos = h.back if tpos: self._tfile.seek(tpos) return (h.tid, pos, data, h.version) def getTid(self, oid): self._lock_acquire() try: result = self._get_cached_tid(oid) if result is None: pos = self._lookup_pos(oid) result = self._getTid(oid, pos) return result finally: self._lock_release() def _getTid(self, oid, pos): self._file.seek(pos) h = self._file.read(16) if not oid == h[:8]: raise AssertionError return h[8:] def _getVersion(self, oid, pos): h = self._read_data_header(pos, oid) if h.version: return (h.version, h.pnv) else: return ('', None) def _transactionalUndoRecord(self, oid, pos, tid, pre, version): '''Get the indo information for a data record Return a 5-tuple consisting of a pickle, data pointer, version, packed non-version data pointer, and current position. If the pickle is true, then the data pointer must be 0, but the pickle can be empty *and* the pointer 0. ''' copy = 1 tpos = self._tindex.get(oid, 0) ipos = self._index.get(oid, 0) if not tpos: pass tipos = ipos if tipos != pos: (ctid, cdataptr, cdata, cver) = self._undoDataInfo(oid, ipos, tpos) if cver != version: raise UndoError('Current and undone versions differ', oid) if cdataptr != pos: try: if cdataptr == tipos or self._loadBackPOS(oid, pos) != self._loadBackPOS(oid, cdataptr): if pre and not tpos: copy = 0 else: raise UndoError('no previous record', oid) except KeyError: raise UndoError('_loadBack() failed', oid) except: None<EXCEPTION MATCH>KeyError None<EXCEPTION MATCH>KeyError if not pre: return ('', 0, '', '', ipos) (version, snv) = self._getVersion(oid, pre) if copy: return ('', pre, version, snv, ipos) try: bdata = self._loadBack_impl(oid, pre)[0] except KeyError: raise UndoError('_loadBack() failed for %s', oid) data = self.tryToResolveConflict(oid, ctid, tid, bdata, cdata) if data: return (data, 0, version, snv, ipos) raise UndoError('Some data were modified by a later transaction', oid) def undoLog(self, first = 0, last = -20, filter = None): if last < 0: last = first - last self._lock_acquire() try: if self._pack_is_in_progress: raise UndoError('Undo is currently disabled for database maintenance.<p>') us = UndoSearch(self._file, self._pos, first, last, filter) while not us.finished(): for i in range(20): if us.finished(): break us.search() self._lock_release() self._lock_acquire() return us.results finally: self._lock_release() def undo(self, transaction_id, transaction): '''Undo a transaction, given by transaction_id. Do so by writing new data that reverses the action taken by the transaction. Usually, we can get by with just copying a data pointer, by writing a file position rather than a pickle. Sometimes, we may do conflict resolution, in which case we actually copy new data that results from resolution. ''' if self._is_read_only: raise POSException.ReadOnlyError() if transaction is not self._transaction: raise POSException.StorageTransactionError(self, transaction) self._lock_acquire() try: return self._txn_undo(transaction_id) finally: self._lock_release() def _txn_undo(self, transaction_id): tid = base64.decodestring(transaction_id + '\n') if not len(tid) == 8: raise AssertionError tpos = self._txn_find(tid, 1) tindex = self._txn_undo_write(tpos) self._tindex.update(tindex) self._toid2tid_delete.update(tindex) return (self._tid, tindex.keys()) def _txn_find(self, tid, stop_at_pack): pos = self._pos while pos > 39: self._file.seek(pos - 8) pos = pos - u64(self._file.read(8)) - 8 self._file.seek(pos) h = self._file.read(TRANS_HDR_LEN) _tid = h[:8] if _tid == tid: return pos if stop_at_pack: if h[16] == 'p': break h[16] == 'p' raise UndoError('Invalid transaction id') def _txn_undo_write(self, tpos): otloc = self._pos here = self._pos + self._tfile.tell() + self._thl base = here - self._tfile.tell() th = self._read_txn_header(tpos) if th.status != ' ': raise UndoError('non-undoable transaction') tend = tpos + th.tlen pos = tpos + th.headerlen() tindex = { } failures = { } while pos < tend: h = self._read_data_header(pos) if h.oid in failures: del failures[h.oid] if not base + self._tfile.tell() == here: raise AssertionError, (here, base, self._tfile.tell()) try: (p, prev, v, snv, ipos) = self._transactionalUndoRecord(h.oid, pos, h.tid, h.prev, h.version) except UndoError: v = None failures[h.oid] = v new = DataHeader(h.oid, self._tid, ipos, otloc, len(v), len(p)) if v: if not self._tvindex.get(v, 0): pass vprev = self._vindex.get(v, 0) new.setVersion(v, snv, vprev) self._tvindex[v] = here if not self._tfile.tell() == here - base: raise AssertionError, (here, base, self._tfile.tell()) self._tfile.write(new.asString()) if p: self._tfile.write(p) else: self._tfile.write(p64(prev)) tindex[h.oid] = here here += new.recordlen() pos += h.recordlen() if pos > tend: raise UndoError('non-undoable transaction') continue if failures: raise MultipleUndoErrors(failures.items()) return tindex def versionEmpty(self, version): if not version: raise POSException.VersionError('The version must be an non-empty string') self._lock_acquire() try: index = self._index file = self._file seek = file.seek read = file.read srcpos = self._vindex_get(version, 0) t = None tstatus = None while srcpos: seek(srcpos) oid = read(8) if index[oid] == srcpos: return 0 h = read(50) tloc = h[16:24] if t != tloc: t = tloc seek(u64(t) + 16) tstatus = read(1) if tstatus != 'u': return 1 spos = h[-8:] srcpos = u64(spos) return 1 finally: self._lock_release() def versions(self, max = None): r = [] a = r.append keys = self._vindex.keys() if max is not None: keys = keys[:max] for version in keys: if self.versionEmpty(version): continue a(version) if max and len(r) >= max: return r continue return r def history(self, oid, version = None, size = 1, filter = None): self._lock_acquire() try: r = [] pos = self._lookup_pos(oid) wantver = version while len(r) >= size: return r h = self._read_data_header(pos) if h.version: if wantver is not None and h.version != wantver: if h.prev: pos = h.prev continue else: return r else: version = '' wantver = None th = self._read_txn_header(h.tloc) if th.ext: d = loads(th.ext) else: d = { } d.update({ 'time': TimeStamp(h.tid).timeTime(), 'user_name': th.user, 'description': th.descr, 'tid': h.tid, 'version': h.version, 'size': h.plen }) if filter is None or filter(d): r.append(d) if h.prev: pos = h.prev continue return r finally: self._lock_release() def _redundant_pack(self, file, pos): if not pos > 8: raise AssertionError, pos file.seek(pos - 8) p = u64(file.read(8)) file.seek((pos - p) + 8) return file.read(1) not in ' u' def pack(self, t, referencesf): '''Copy data from the current database file to a packed file Non-current records from transactions with time-stamp strings less than packtss are ommitted. As are all undone records. Also, data back pointers that point before packtss are resolved and the associated data are copied, since the old records are not copied. ''' if self._is_read_only: raise POSException.ReadOnlyError() stop = `TimeStamp(*time.gmtime(t)[:5] + (t % 60,))` if stop == z64: raise FileStorageError('Invalid pack time') if not self._index: return None self._lock_acquire() try: if self._pack_is_in_progress: raise FileStorageError('Already packing') self._pack_is_in_progress = True current_size = self.getSize() finally: self._lock_release() p = FileStoragePacker(self._file_name, stop, self._lock_acquire, self._lock_release, self._commit_lock_acquire, self._commit_lock_release, current_size) try: opos = None try: opos = p.pack() except RedundantPackWarning: detail = None logger.info(str(detail)) if opos is None: return None oldpath = self._file_name + '.old' self._lock_acquire() try: self._file.close() try: if os.path.exists(oldpath): os.remove(oldpath) os.rename(self._file_name, oldpath) except Exception: self._file = open(self._file_name, 'r+b') raise os.rename(self._file_name + '.pack', self._file_name) self._file = open(self._file_name, 'r+b') self._initIndex(p.index, p.vindex, p.tindex, p.tvindex, p.oid2tid, p.toid2tid, p.toid2tid_delete) self._pos = opos self._save_index() finally: self._lock_release() finally: if p.locked: self._commit_lock_release() self._lock_acquire() self._pack_is_in_progress = False self._lock_release() def iterator(self, start = None, stop = None): return FileIterator(self._file_name, start, stop) def lastTransaction(self): '''Return transaction id for last committed transaction''' return self._ltid def lastTid(self, oid): '''Return last serialno committed for object oid. If there is no serialno for this oid -- which can only occur if it is a new object -- return None. ''' try: return self.getTid(oid) except KeyError: return None def cleanup(self): '''Remove all files created by this storage.''' for ext in ('', '.old', '.tmp', '.lock', '.index', '.pack'): try: os.remove(self._file_name + ext) continue except OSError: e = None if e.errno != errno.ENOENT: raise e.errno != errno.ENOENT def record_iternext(self, next = None): index = self._index oid = index.minKey(next) (oid_as_long,) = unpack('>Q', oid) next_oid = pack('>Q', oid_as_long + 1) try: next_oid = index.minKey(next_oid) except ValueError: next_oid = None (data, tid) = self.load(oid, '') return (oid, tid, data, next_oid) def shift_transactions_forward(index, vindex, tindex, file, pos, opos): '''Copy transactions forward in the data file This might be done as part of a recovery effort ''' seek = file.seek read = file.read write = file.write index_get = index.get vindex_get = vindex.get pv = z64 p1 = opos p2 = pos offset = p2 - p1 pnv = None while None: h = read(TRANS_HDR_LEN) if len(h) < TRANS_HDR_LEN: break (tid, stl, status, ul, dl, el) = unpack(TRANS_HDR, h) if status == 'c': break tl = u64(stl) tpos = pos tend = tpos + tl otpos = opos thl = ul + dl + el h2 = read(thl) if len(h2) != thl: raise PackError(opos) seek(opos) write(h) write(h2) thl = TRANS_HDR_LEN + thl pos = tpos + thl opos = otpos + thl while pos < tend: seek(pos) h = read(DATA_HDR_LEN) (oid, serial, sprev, stloc, vlen, splen) = unpack(DATA_HDR, h) plen = u64(splen) if not plen: pass dlen = DATA_HDR_LEN + 8 if vlen: dlen = dlen + 16 + vlen pnv = u64(read(8)) seek(8, 1) version = read(vlen) pv = p64(vindex_get(version, 0)) if status != 'u': vindex[version] = opos tindex[oid] = opos if plen: p = read(plen) else: p = read(8) p = u64(p) if p >= p2: p = p - offset elif p >= p1: p = index_get(oid, 0) p = p64(p) seek(opos) sprev = p64(index_get(oid, 0)) write(pack(DATA_HDR, oid, serial, sprev, p64(otpos), vlen, splen)) if vlen: if not pnv: write(z64) elif pnv >= p2: pnv = pnv - offset elif pnv >= p1: pnv = index_get(oid, 0) write(p64(pnv)) write(pv) write(version) write(p) opos = opos + dlen pos = pos + dlen pos = pos + 8 if status != 'u': index.update(tindex) tindex.clear() write(stl) opos = opos + 8 return opos def search_back(file, pos): seek = file.seek read = file.read seek(0, 2) s = p = file.tell() while p > pos: seek(p - 8) l = u64(read(8)) if l <= 0: break p = p - l - 8 return (p, s) def recover(file_name): file = open(file_name, 'r+b') index = { } vindex = { } tindex = { } (pos, oid, tid) = read_index(file, file_name, index, vindex, tindex, recover = 1) if oid is not None: print 'Nothing to recover' return None opos = pos (pos, sz) = search_back(file, pos) if pos < sz: npos = shift_transactions_forward(index, vindex, tindex, file, pos, opos) file.truncate(npos) print 'Recovered file, lost %s, ended up with %s bytes' % (pos - opos, npos) def read_index(file, name, index, vindex, tindex, stop = '\xff' * 8, ltid = z64, start = 0x4L, maxoid = z64, recover = 0, read_only = 0): """Scan the file storage and update the index. Returns file position, max oid, and last transaction id. It also stores index information in the three dictionary arguments. Arguments: file -- a file object (the Data.fs) name -- the name of the file (presumably file.name) index -- fsIndex, oid -> data record file offset vindex -- dictionary, oid -> data record offset for version data tindex -- dictionary, oid -> data record offset tindex is cleared before return There are several default arguments that affect the scan or the return values. TODO: document them. start -- the file position at which to start scanning for oids added beyond the ones the passed-in indices know about. The .index file caches the highest ._pos FileStorage knew about when the the .index file was last saved, and that's the intended value to pass in for start; accept the default (and pass empty indices) to recreate the index from scratch maxoid -- ignored (it meant something prior to ZODB 3.2.6; the argument still exists just so the signature of read_index() stayed the same) The file position returned is the position just after the last valid transaction record. The oid returned is the maximum object id in `index`, or z64 if the index is empty. The transaction id is the tid of the last transaction, or ltid if the index is empty. """ read = file.read seek = file.seek seek(0, 2) file_size = file.tell() fmt = TempFormatter(file) if file_size: if file_size < start: raise FileStorageFormatError(file.name) seek(0) if read(4) != packed_version: raise FileStorageFormatError(name) elif not read_only: file.write(packed_version) return (0x4L, z64, ltid) index_get = index.get pos = start seek(start) tid = '\x00' * 7 + '\x01' while None: h = read(TRANS_HDR_LEN) if not h: break if len(h) != TRANS_HDR_LEN: if not read_only: logger.warning('%s truncated at %s', name, pos) seek(pos) file.truncate() break (tid, tl, status, ul, dl, el) = unpack(TRANS_HDR, h) if tid <= ltid: logger.warning('%s time-stamp reduction at %s', name, pos) ltid = tid if pos + tl + 8 > file_size or status == 'c': if not read_only: logger.warning('%s truncated, possibly due to damaged records at %s', name, pos) _truncate(file, name, pos) break if status not in ' up': logger.warning('%s has invalid status, %s, at %s', name, status, pos) if tl < TRANS_HDR_LEN + ul + dl + el: seek(-8, 2) rtl = u64(read(8)) if file_size - rtl < pos or rtl < TRANS_HDR_LEN: logger.critical('%s has invalid transaction header at %s', name, pos) if not read_only: logger.warning('It appears that there is invalid data at the end of the file, possibly due to a system crash. %s truncated to recover from bad data at end.' % name) _truncate(file, name, pos) break elif recover: return (pos, None, None) panic('%s has invalid transaction header at %s', name, pos) if tid >= stop: break tpos = pos tend = tpos + tl if status == 'u': seek(tend) h = u64(read(8)) if h != tl: if recover: return (tpos, None, None) panic('%s has inconsistent transaction length at %s', name, pos) pos = tend + 8 continue pos = tpos + TRANS_HDR_LEN + ul + dl + el while pos < tend: h = fmt._read_data_header(pos) dlen = h.recordlen() tindex[h.oid] = pos if h.version: vindex[h.version] = pos if pos + dlen > tend or h.tloc != tpos: if recover: return (tpos, None, None) panic('%s data record exceeds transaction record at %s', name, pos) if index_get(h.oid, 0) != h.prev: if h.prev: if recover: return (tpos, None, None) logger.error('%s incorrect previous pointer at %s', name, pos) else: logger.warning('%s incorrect previous pointer at %s', name, pos) pos += dlen if pos != tend: if recover: return (tpos, None, None) panic("%s data records don't add up at %s", name, tpos) h = u64(read(8)) if h != tl: if recover: return (tpos, None, None) panic('%s redundant transaction length check failed at %s', name, pos) pos += 8 index.update(tindex) tindex.clear() try: maxoid = index.maxKey() except ValueError: maxoid == z64 return (pos, maxoid, ltid) def _truncate(file, name, pos): file.seek(0, 2) file_size = file.tell() try: i = 0 while None: oname = '%s.tr%s' % (name, i) if os.path.exists(oname): i += 1 continue logger.warning('Writing truncated data from %s to %s', name, oname) o = open(oname, 'wb') file.seek(pos) cp(file, o, file_size - pos) o.close() break except: logger.error("couldn't write truncated data for %s", name, exc_info = True) raise POSException.StorageSystemError("Couldn't save truncated data") file.seek(pos) file.truncate() class Iterator: '''A General simple iterator that uses the Python for-loop index protocol ''' __index = -1 __current = None def __getitem__(self, i): _Iterator__index = self._Iterator__index while i > _Iterator__index: _Iterator__index = _Iterator__index + 1 self._Iterator__current = self.next(_Iterator__index) self._Iterator__index = _Iterator__index return self._Iterator__current class FileIterator(Iterator, FileStorageFormatter): '''Iterate over the transactions in a FileStorage file. ''' _ltid = z64 _file = None def __init__(self, file, start = None, stop = None): if isinstance(file, str): file = open(file, 'rb') self._file = file if file.read(4) != packed_version: raise FileStorageFormatError(file.name) file.seek(0, 2) self._file_size = file.tell() self._pos = 0x4L if not start is None and isinstance(start, str): raise AssertionError if not stop is None and isinstance(stop, str): raise AssertionError if start: self._skip_to_start(start) self._stop = stop def __len__(self): return 0 def iterator(self): return self def close(self): file = self._file if file is not None: self._file = None file.close() def _skip_to_start(self, start): file = self._file read = file.read seek = file.seek while None: h = read(16) if len(h) < 16: return None (tid, stl) = unpack('>8s8s', h) if tid >= start: return None tl = u64(stl) try: self._pos += tl + 8 except OverflowError: self._pos = long(self._pos) + tl + 8 if __debug__: seek(self._pos - 8, 0) rtl = read(8) if rtl != stl: pos = file.tell() - 8 panic('%s has inconsistent transaction length at %s (%s != %s)', file.name, pos, u64(rtl), u64(stl)) rtl != stl def next(self, index = 0): if self._file is None: raise IOError('iterator is closed') pos = self._pos while None: try: h = self._read_txn_header(pos) except CorruptedDataError: err = None if not err.buf: break raise if h.tid <= self._ltid: logger.warning('%s time-stamp reduction at %s', self._file.name, pos) self._ltid = h.tid if self._stop is not None and h.tid > self._stop: raise IndexError(index) if h.status == 'c': raise IndexError(index) if pos + h.tlen + 8 > self._file_size: logger.warning('%s truncated, possibly due to damaged records at %s', self._file.name, pos) break if h.status not in ' up': logger.warning('%s has invalid status, %s, at %s', self._file.name, h.status, pos) if h.tlen < h.headerlen(): self._file.seek(-8, 2) rtl = u64(self._file.read(8)) if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN: logger.critical('%s has invalid transaction header at %s', self._file.name, pos) logger.warning('It appears that there is invalid data at the end of the file, possibly due to a system crash. %s truncated to recover from bad data at end.' % self._file.name) break else: logger.warning('%s has invalid transaction header at %s', self._file.name, pos) break tpos = pos tend = tpos + h.tlen if h.status != 'u': pos = tpos + h.headerlen() e = { } if h.elen: try: e = loads(h.ext) result = RecordIterator(h.tid, h.status, h.user, h.descr, e, pos, tend, self._file, tpos) rtl = u64(self._file.read(8)) if rtl != h.tlen: logger.warning('%s redundant transaction length check failed at %s', self._file.name, tend) break self._pos = tend + 8 return result raise IndexError(index) class RecordIterator(Iterator, BaseStorage.TransactionRecord, FileStorageFormatter): '''Iterate over the transactions in a FileStorage file.''' def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos): self.tid = tid self.status = status self.user = user self.description = desc self._extension = ext self._pos = pos self._tend = tend self._file = file self._tpos = tpos def next(self, index = 0): pos = self._pos while pos < self._tend: h = self._read_data_header(pos) dlen = h.recordlen() if pos + dlen > self._tend or h.tloc != self._tpos: logger.warning('%s data record exceeds transaction record at %s', file.name, pos) break self._pos = pos + dlen prev_txn = None if h.plen: data = self._file.read(h.plen) elif h.back == 0: data = None else: (data, tid) = self._loadBackTxn(h.oid, h.back, False) prev_txn = self.getTxnFromData(h.oid, h.back) r = Record(h.oid, h.tid, h.version, data, prev_txn, pos) return r raise IndexError(index) class Record(BaseStorage.DataRecord): '''An abstract database record.''' def __init__(self, oid, tid, version, data, prev, pos): self.oid = oid self.tid = tid self.version = version self.data = data self.data_txn = prev self.pos = pos class UndoSearch: def __init__(self, file, pos, first, last, filter = None): self.file = file self.pos = pos self.first = first self.last = last self.filter = filter self.i = 0 self.results = [] self.stop = False def finished(self): '''Return True if UndoSearch has found enough records.''' if not self.i >= self.last and self.pos < 39: pass return self.stop def search(self): '''Search for another record.''' dict = self._readnext() if dict is not None: if self.filter is None or self.filter(dict): if self.i >= self.first: self.results.append(dict) self.i += 1 def _readnext(self): '''Read the next record from the storage.''' self.file.seek(self.pos - 8) self.pos -= u64(self.file.read(8)) + 8 self.file.seek(self.pos) h = self.file.read(TRANS_HDR_LEN) (tid, tl, status, ul, dl, el) = unpack(TRANS_HDR, h) if status == 'p': self.stop = 1 return None if status != ' ': return None d = u = '' if ul: u = self.file.read(ul) if dl: d = self.file.read(dl) e = { } if el: try: e = loads(self.file.read(el)) d = { 'id': base64.encodestring(tid).rstrip(), 'time': TimeStamp(tid).timeTime(), 'user_name': u, 'size': tl, 'description': d } d.update(e) return d